package org.ykx.demo

import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.dmg.pmml.True
import org.apache.spark.rdd.RDD
import java.util.Properties
import java.util.Random

case class ModelData(busbarsection_id: String,breaker_id: String, busbarsection_name: String,  breaker_name: String)
case class StreamingData(device_id: String,time_mark: String, value: String, num: Long)

object RddTest {
   //定义sparkContext
   val conf = new SparkConf().setMaster ("local").setAppName ("Test1")
   val sc = new SparkContext (conf)
   

   //获取随机数
  def getNum():Int ={
    val rand = new Random();
    rand.nextInt(10)
  }
  
  //yield的主要作用是记住每次迭代中的有关值，并逐一存入到一个数组中
  def yieldFor():Unit ={
    val lst = List("I love Scala","I love Spark","I love Hadoop")
    val lengths =
    for{
      element <- lst
    }yield  element + "：合计" + element.length + "个字。"
    
    lengths.foreach(println)
  }
  
  //wordCount示例
  def wordCount():Unit={
    val lstStr = List("apple","orange","peach","cherry","cherry","peer",
                      "watermelon","watermelon","orange","banana","apple","mogo",
                      "strawberry","banana","apple","mogo","banana","mogo")
    val sqlContext = new SQLContext(sc)
    val wc = sc.parallelize(lstStr, 3).map( x => (x,1)).reduceByKey(_ + _)
    wc.foreach(f => println(f._1 + " " + f._2))
  }
  
  
  //Map与flatMap:flatMap扁平话意思大概就是先用了一次map之后对全部数据再一次map
  def mapAndflatMap(){
    val arr=sc.parallelize(Array(("A",1),("B",2),("C",3)))
    arr.flatMap(x=>(x._1+x._2)).foreach(println)
    println("===============================================")
    arr.map(x=>(x._1+x._2)).foreach(println)
    
  }
  
  //分组
  def groupByDemo(){
    val a = sc.parallelize(1 to 9, 3)
    a.groupBy { x => if(x % 2 == 0) "even" else "odd" }.foreach(println)
    
  }

  //取前多少条数据，按递减排序
  def topTest(): Unit = {
    val a = sc.parallelize(List(1, 342, 2, 44, 74, 21, 7, 68, 9, 110))
    println("[list]: "+a.foreach { x => println(x+" ") })
    println("[top]: "+a.top(3).toList)
    println("[take]: "+a.take(3).toList)
    println("[takeOrdered]: "+a.takeOrdered(3).toList)
  }  
  
  def reduceTest(): Unit = {
		  val a = sc.parallelize(1 to 10,1)
		  val b =	a.reduce(_ * _).floatValue()
		  println(b)
  }  
  
  def lookupTest(){
     val a = sc.parallelize(List((1,'a'),(2,'b'),(3,'c'),(4,'d'),(5,'e')))
     println(a.lookup(1).toList)
  }
  
  def getModelRDD():RDD[(String,String)]={
   return sc.parallelize(List(("DE-10053","防城港220KV母线,DE-66365"),("DE-66365","千与千寻它馈线"),
        ("DE-10054","佛山地市局110KV母线,DE-66366"),("DE-66366","你的名字馈线")), 1)
    
  }

  def getStreamingRDD(): RDD[(String, String)] = {
    return sc.parallelize(List(("DE-10053", "2017-04-14 16:05:02,113.236"), ("DE-66365", "2017-04-14 16:05:02,1"),
      ("DE-10054", "2017-04-14 16:05:02,0.236"), ("DE-66366", "2017-04-14 16:05:02,1")), 1)

  }
  
  def main(args: Array[String]): Unit = {
//    println("hello scala...")
    
//    println(getNum()+staticObject.love)
    
//    yieldFor()
    
//    val conf = new SparkConf().setMaster ("local").setAppName ("Test1")
//    val sc = new SparkContext (conf)
//    val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
//    val b = a.map(_.length())
//    val c= a.zip(b)
//    c.foreach(x => println(x._1+":"+x._2))
    
//    mapAndflatMap
    
//    groupByDemo
    
//    println("===========================")
//    System.err.println(staticObject.*) 
    
//    topTest()
    
//    reduceTest()
    
//    lookupTest
    
    val modelRDD = sc.parallelize(List(("DE-10053","DE-66365","防城港220KV母线","千与千寻开关"),
                                      ("DE-10053","DE-66366","防城港220KV母线","沙发沙开关"),
                                      ("DE-10054","DE-66367","桂林110KV母线","干嘛开关"),
                                      ("DE-10054","DE-66368","桂林110KV母线","你的名字馈线"),
                                      ("DE-10055","DE-66369","柳州310KV母线","开关a"),
                                      ("DE-10056","DE-66370","玉林110KV母线","馈线B")), 1)
   
    val sdfRdd = sc.parallelize(List(("DE-10053", "2017-04-14 16:05:02","113.236",1000000), 
                                          ("DE-66365", "2017-04-14 16:05:02","0",1010000),
                                          ("DE-10054", "2017-04-14 16:05:02","0",1000001),
                                          ("DE-66366", "2017-04-14 16:05:02","1",1000002),
                                          ("DE-10053", "2017-04-14 16:10:02","225.2",1000003),
                                          ("DE-66365", "2017-04-14 16:10:02","0",1000004),
                                          ("DE-10054", "2017-04-14 16:10:02","0",1000005), 
                                          ("DE-66366", "2017-04-14 16:10:02","1",1000006)), 1)
                                          
   val bdfRdd = sc.parallelize(List(("DE-11111", "2017-04-14 16:05:02","113.236",1000000), 
                                          ("DE-22222", "2017-04-14 16:05:02","0",1000012),
                                          ("DE-33333", "2017-04-14 16:05:02","0",1000042),
                                          ("DE-44444", "2017-04-14 16:05:02","1",1000052),
                                          ("DE-55555", "2017-04-14 16:10:02","225.2",1000001),
                                          ("DE-66666", "2017-04-14 16:10:02","0",10000123),
                                          ("DE-77777", "2017-04-14 16:10:02","0",10000003), 
                                          ("DE-88888", "2017-04-14 16:10:02","1",10000002)), 1)                                          
    val sqlContext = new SQLContext(sc)
    // 导入语句，可以隐式地将RDD转化成DataFrame
    import sqlContext.implicits._
    val mdf = modelRDD.map { line => ModelData(line._1,line._2,line._3,line._4)}.toDF()
     mdf.registerTempTable("model_data") //注册成临时表
     
    val sdf = sdfRdd.map{line => StreamingData(line._1,line._2,line._3,line._4)}.toDF()
    sdf.registerTempTable("sdf_data") 
    val bdf = bdfRdd.map{line => StreamingData(line._1,line._2,line._3,line._4)}.toDF()
    bdf.registerTempTable("bdf_data") 
    println("============================TEST=============================")
//   import sqlContext.implicits._
//    import org.apache.spark.sql.functions._
//    val s = sdf.join(bdf, sdf("time_mark") === bdf("time_mark"))
//      //    .select(sdf("time_mark").as("tm1"), bdf("time_mark").as("tm2"))
//      .select(sdf("*"))
//      .show(100)
    
//    sdf.where("(num + 100) > 1001000").show()
    
    
    sdf.drop("num").show 
    
//   println("[result]: "+s)
    
//    val result_data = sqlContext.sql("select m.busbarsection_id,m.breaker_id,s.time_mark,m.breaker_name,0 as psr_type,1 as stop_power_type from model_data m left join streaming_data s on m.busbarsection_id=s.device_id "
//                                     +" and s.value = 0").filter("time_mark is not null").show()
//    val result_data = sqlContext.sql("select distinct m.breaker_id,s.time_mark,m.breaker_name,0 as psr_type,1 as stop_power_type from model_data m "+
//                                      "inner join streaming_data s on m.busbarsection_id=s.device_id  where s.value <10 " +
//                                      "union all "+
//                                      "select distinct t.breaker_id,t.time_mark,t.breaker_name,t.psr_type,t.stop_power_type from (	"+		 
//                                      "select m.breaker_id,s.time_mark,m.breaker_name,0 as psr_type,1 as stop_power_type from model_data m "+
//                                      "inner join streaming_data s on m.busbarsection_id=s.device_id where s.value >10 ) t                "+
//                                      "inner join streaming_data s1 on t.breaker_id=s1.device_id and t.time_mark=s1.time_mark "+
//                                      "where s1.value=1").show()
                            
//    println("=========================[清空数据后]=========================")
//    sqlContext.dropTempTable("streaming_data")
//    val data = sqlContext.sql("select * from in_model limit 100").show 

//    val prop = new Properties()
//    prop.put("user", "root")
//    prop.put("password", "123456")
//    data.write.mode("append").jdbc("jdbc:mysql://localhost:3306/test", "result_data", prop)
   
  }
}


//单列对象
private object staticObject{
  val love ="520"
  
  def * :Int = {1}
}